Skip to content

Instantly share code, notes, and snippets.

@diegofps
Created May 13, 2023 02:23
Show Gist options
  • Save diegofps/87945a0c3e800c747f3af07833ff6b7e to your computer and use it in GitHub Desktop.
Save diegofps/87945a0c3e800c747f3af07833ff6b7e to your computer and use it in GitHub Desktop.
A simple Python gateway implemention
#!/usr/bin/env python3
from threading import Thread, Lock
from queue import Queue
import logging as log
import traceback
import random
import time
import sys
log.basicConfig(format='%(name)s - %(levelname)s - %(message)s', level=log.DEBUG)
class MqttReader(IotReader):
def __init__(self, queue_master):
super().__init__(queue_master, 'MqttReader')
class MqttWriter(IotWriter):
def __init__(self):
super().__init__('MqttWriter')
class AnySerialReader:
def __init__(self, name="AnySerialReader"):
self.name = name
self.next_event = time.time() + random.random() * 5
def read(self, timeout=None):
now = time.time()
if now < self.next_event:
return None
self.next_event = time.time() + random.random() * 5
data = random.choice(['event_a', 'event_b', 'event_c'])
log.debug('')
log.debug(f'{self.name} => {data}')
return data
def terminate(self):
log.warning(f"Terminating {self.name}")
class AnySerialWriter:
def __init__(self, name="AnySerialWriter"):
self.name = name
def write(self, data):
log.debug(f'{self.name} <= {data}')
def terminate(self):
log.warning(f"Terminating {self.name}")
class IotReader(Thread):
def __init__(self, queue_master, name='IotReader'):
super().__init__()
self.queue_master = queue_master
self.queue = Queue()
self.done = False
self.name = name
self.start()
def terminate(self):
self.done = True
def run(self):
log.info(f"Starting thread for {self.name}")
serial_reader = AnySerialReader('Serial' + self.name)
log.info(f"Serial reader for {self.name} initialized")
while not self.done:
try:
data = serial_reader.read(timeout=1)
if data is None:
continue
self.queue_master.put(('on_iot_event', data))
except:
traceback.print_exc(file=sys.stdout)
log.warning("Terminating IotReader")
serial_reader.terminate()
class IotWriter(Thread):
def __init__(self, name='IotWriter'):
super().__init__()
self.queue = Queue()
self.done = False
self.name = name
self.start()
def terminate(self):
self.done = True
self.queue.put( ('terminate', None) )
def send(self, data):
self.queue.put( ('write_message', data) )
def run(self):
log.info(f"Starting thread for {self.name}")
serial_writer = AnySerialWriter('Serial' + self.name)
log.info(f"Serial writer for {self.name} initialized")
while not self.done:
try:
action, data = self.queue.get()
if action == 'terminate':
break
elif action == 'write_message':
serial_writer.write(data)
else:
log.error(f'Unknown action for IotWriter - action={action}, data={data}')
except:
traceback.print_exc(file=sys.stdout)
log.warning("Terminating IotWriter")
serial_writer.terminate()
class Gateway(Thread):
def __init__(self):
super().__init__()
self.queue_master = Queue()
self.done = False
self.start()
def run(self):
log.info("Starting Gateway")
while not self.done:
try:
self.iot_reader = IotReader(self.queue_master)
self.iot_writer = IotWriter()
self.mqtt_reader = MqttReader(self.queue_master)
self.mqtt_writer = MqttWriter()
log.info(f"Starting {self.__class__.__name__}")
while not self.done:
try:
action, data = self.queue_master.get()
if action in 'on_mqtt_event':
self.on_mqtt_event(data)
elif action == 'on_iot_event':
self.on_iot_event(data)
elif action == 'terminate':
break
else:
log.error(f'Unknown action, action={action}, data={data}')
except:
log.error("Error during message parsing")
traceback.print_exc(file=sys.stdout)
except:
log.error("Error during gateway configuration")
traceback.print_exc(file=sys.stdout)
self.iot_reader.terminate()
self.iot_writer.terminate()
self.mqtt_reader.terminate()
self.mqtt_writer.terminate()
self.iot_reader.join()
self.iot_writer.join()
self.mqtt_reader.join()
self.mqtt_writer.join()
log.warning('Terminating Gateway')
def terminate(self):
self.done = True
self.queue_master.put(('terminate', None))
def on_iot_event(self, data):
log.info(f'Event from iot device, forwarding to mqtt, data={data}')
self.mqtt_writer.send(data)
def on_mqtt_event(self, data):
log.info(f'Event from iot device, forwarding to iot, data={data}')
self.iot_writer.send(data)
gateway = Gateway()
# Your main thread is free here, you could start a webserver and display
# a dashboard. Or just wait, like below.
try:
gateway.join()
except KeyboardInterrupt:
log.info("Sending terminate command...")
gateway.terminate()
try:
gateway.join()
except KeyboardInterrupt:
log.info("Killing the app...")
sys.exit(0)
pass
log.info("Bye!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment